package com.cloudansys.core.flink.function;

import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.*;

@Slf4j
public class RemoveMaxAndMinProcessor extends ProcessWindowFunction<List<MultiDataEntity>, List<MultiDataEntity>, String, TimeWindow> {

    /**
     * 10 秒去除一个最大值和一个最小值然后返回一个瞬时值
     */
    @Override
    public void process(String k, Context context, Iterable<List<MultiDataEntity>> elements, Collector<List<MultiDataEntity>> out) throws Exception {
        // eleMap 的 Key 为：projectId + serialCode  Value 为：这十秒内第二个指标的集合
        Map<String, List<Double>> valMap = new HashMap<>();
        Map<String, MultiDataEntity> objMap = new HashMap<>();
        for (List<MultiDataEntity> element : elements) {
            for (MultiDataEntity multiDataEntity : element) {
                String projectId = multiDataEntity.getProjectId();
                String serialCode = multiDataEntity.getSerialCode();
                String key = projectId + serialCode;
                Double[] values = multiDataEntity.getValues();
                Double value = values[1];
                if (valMap.containsKey(key)) {
                    valMap.get(key).add(value);
                } else {
                    List<Double> valSet = new ArrayList<>();
                    valSet.add(value);
                    valMap.put(key, valSet);
                }
                if (!objMap.containsKey(key)) {
                    objMap.put(key, multiDataEntity);
                }
            }
        }
        for (String key : valMap.keySet()) {
            List<Double> valueSet = valMap.get(key);
            if (valueSet.size() == 0) {
                continue;
            }
            // 去除最大值和最小值
            double max = valueSet.stream().mapToDouble(value -> value).max().getAsDouble();
            double min = valueSet.stream().mapToDouble(value -> value).min().getAsDouble();
            valueSet.remove(max);
            valueSet.remove(min);
            if (valueSet.size() == 0) {
                continue;
            }
            // 然后把 valSet 的第【长度的一半】个值返回作为这 10 秒的瞬时值
            int i = valueSet.size() / 2;
            // 把第三个指标设置成 十秒瞬时值
            objMap.get(key).getValues()[2] = valueSet.get(i);
        }
//        log.info("objMap: {}", objMap);
        out.collect(new ArrayList<>(objMap.values()));
    }
}
